Skip to content

[DSL] POC2#3299

Open
GuillaumeDSM wants to merge 18 commits intodsl_TMfrom
dsl_poc2
Open

[DSL] POC2#3299
GuillaumeDSM wants to merge 18 commits intodsl_TMfrom
dsl_poc2

Conversation

@GuillaumeDSM
Copy link
Member

requires #3277

@GuillaumeDSM GuillaumeDSM self-assigned this Mar 9, 2026
@GuillaumeDSM GuillaumeDSM requested a review from Herklos as a code owner March 9, 2026 20:53
@GuillaumeDSM GuillaumeDSM force-pushed the dsl_poc2 branch 9 times, most recently from 0db3ad0 to 87222d5 Compare March 12, 2026 10:44
@GuillaumeDSM GuillaumeDSM changed the title [DSL] add resolved_params update octobot_node [DSL] POC2 Mar 12, 2026
Copy link
Member

@Herklos Herklos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great feature!!



@contextlib.contextmanager
def logged_waiter(self, name: str, sleep_time: float = 30) -> typing.Generator[None, None, None]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +24 to +34
class BaseHistory(pydantic.BaseModel):
completed_iterations: int = 1
created_orders: list[dict] = pydantic.Field(default_factory=list)
cancelled_orders: list[str] = pydantic.Field(default_factory=list)
transfers: list[dict] = pydantic.Field(default_factory=list)

def update(self, history: "BaseHistory"):
self.completed_iterations += history.completed_iterations
self.created_orders.extend(history.created_orders) # pylint: disable=no-member
self.cancelled_orders.extend(history.cancelled_orders) # pylint: disable=no-member
self.transfers.extend(history.transfers) # pylint: disable=no-member
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to not be a "BaseHistory" that any history could inherit from but a "TradingHistory" isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, thanks 👍

octobot_node.scheduler.SCHEDULER.INSTANCE = dbos.DBOS


class TestSchedulerRecovery:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +24 to +32
def is_valid_symbol(self, exchange_name: str, exchange_type: str, sandboxed: bool, symbol: str) -> bool:
try:
# will raise if symbol is missing (therefore invalid)
self._ALL_TICKERS_BY_EXCHANGE_KEY[ # pylint: disable=expression-not-assigned
self.get_exchange_key(exchange_name, exchange_type, sandboxed)
][symbol]
return True
except KeyError:
return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once fully migrated to ccxt cache, yes

@@ -0,0 +1,54 @@
# Drakkar-Software OctoBot-Trading
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we move these functions outside of test_tools?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😂 I think we should

':read ECONNRESET:read ETIMEDOUT'
)
).split(":"))
USE_CCXT_SHARED_MARKETS_CACHE = os_util.parse_boolean_environment_var("USE_CCXT_SHARED_MARKETS_CACHE", "True")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -512,12 +710,35 @@ async def test_run_multiple_actions_bundle_no_wait(self, multiple_actions_bundle


async def test_run_multiple_actions_bundle_with_wait(self, multiple_action_bundle_with_wait):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +35 to +31
if task.type == octobot_node.models.TaskType.START_OCTOBOT.value:
handle = await SCHEDULER.BOT_WORKFLOW_QUEUE.enqueue_async(
full_bot_workflow.FullBotWorkflow.start,
t=workflow_base.Tracker(name=f"{task.name}_{_generate_instance_name()}"),
inputs=full_bot_workflow.FullBotWorkflowStartInputs(task=task, delay=delay).to_dict(include_default_values=False)
)
elif task.type == octobot_node.models.TaskType.STOP_OCTOBOT.value:
handle = await SCHEDULER.BOT_WORKFLOW_QUEUE.enqueue_async(
full_bot_workflow.FullBotWorkflow.stop,
t=workflow_base.Tracker(name=f"{task.name}_{_generate_instance_name()}"),
inputs=full_bot_workflow.FullBotWorkflowStopInputs(task=task, delay=delay).to_dict(include_default_values=False)
)
elif task.type == octobot_node.models.TaskType.EXECUTE_ACTIONS.value:
handle = await SCHEDULER.BOT_WORKFLOW_QUEUE.enqueue_async(
bot_workflow.BotWorkflow.execute_octobot,
t=workflow_base.Tracker(name=f"{task.name}_{_generate_instance_name()}"),
inputs=bot_workflow.BotWorkflowInputs(task=task, delay=delay).to_dict(include_default_values=False)
if task.type == octobot_node.models.TaskType.EXECUTE_ACTIONS.value:
handle = await SCHEDULER.AUTOMATIONS_WORKFLOW_QUEUE.enqueue_async(
automations_workflow.AutomationsWorkflow.execute_automations,
t=params.Tracker(name=workflows_util.generate_workflow_name(task.name)),
inputs=params.AutomationsWorkflowInputs(task=task, delay=delay).to_dict(include_default_values=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

Comment on lines +133 to +135
created_orders=result.get_created_orders(),
cancelled_orders=result.get_cancelled_orders(),
transfers=result.get_deposit_and_withdrawal_details(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it represent the bot state? What will this data be used for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the summary of all the actions of an automation that is kept during the whole life of the automation.
The idea is to quickly know the most important things done by an automation during its lifetime

@GuillaumeDSM GuillaumeDSM force-pushed the dsl_poc2 branch 9 times, most recently from 6830064 to 1d4bd0e Compare March 16, 2026 14:52
@GuillaumeDSM GuillaumeDSM force-pushed the dsl_poc2 branch 5 times, most recently from fff305a to 28a066f Compare March 17, 2026 09:01
@GuillaumeDSM GuillaumeDSM force-pushed the dsl_poc2 branch 3 times, most recently from 8ccb0cd to 9c6a19a Compare March 17, 2026 11:27
@GuillaumeDSM GuillaumeDSM force-pushed the dsl_poc2 branch 2 times, most recently from 436a521 to ef4d029 Compare March 17, 2026 20:44
Copy link
Member

@Herklos Herklos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome implementation 💯 very nice job !!

)


class ContextBasedFileHandler(logging.Handler):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍



@contextlib.contextmanager
def decrypted_bots_configurations(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

): # pylint: disable=undefined-variable
if auth_details.encrypted:
raise NotImplementedError("_decrypt_exchange_credentials not implemented")
# todo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

Comment on lines +41 to +43
def has_pending_groups(self) -> bool:
# TODO
return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll implement this function later on, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly

# 2. process on filled and cancelled orders actions if necessary
await self._process_on_filled_and_cancelled_orders_actions()
# 3. update strategy if necessary
changed_elements, next_execution_scheduled_to = await self._execute_actions()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we imagine to infer pre-action and post actions actions based on the action to be executed. I mean if it's a "TradingAction" then we expect orders and so call _process_on_filled_and_cancelled_orders_actions but if it's a "CheckAction" or a "whateverAction" it may also need some pre-actions and post-actions that wont be orders processing.
Does it make sense?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, post actions are implemented (it's how the stop_automation dsl keyword works), we could also imagine pre_actions if we need them (possibly identified via dependencies)

@@ -0,0 +1,81 @@
class MiniOctobotError(Exception):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it's up

next_actions = result.actions_dag.get_executable_actions()
remaining_steps = len(result.actions_dag.get_pending_actions())
next_step_at = result.next_actions_description.get_next_execution_time()
# TODO next_iteration_description should be encrypted if encryption is enabled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

Comment on lines +112 to +115
# TMP: add a simulated portfolio to the params
parsed_description["params"]["SIMULATED_PORTFOLIO"] = {
"ETH": 1,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add it now or later?

await self.INSTANCE.delete_workflows_async(merged_to_delete_workflow_ids, delete_children=False)
self.logger.info(f"Vacuuming database")
with self.INSTANCE._sys_db.engine.begin() as conn:
conn.execute(sqlalchemy.text("VACUUM"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"""


class WaitOperator(dsl_interpreter.PreComputingCallOperator, dsl_interpreter.ReCallableOperatorMixin):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants